GRAY_CNN(图片灰度化处理+图片识别) ================================== 应用概述 ------------- 这里将以图片灰度化加图片识别来介绍AI+DSP应用开发的开发流程。 其中图片灰度化可以使用DSP来完成,图片识别则使用AI来完成。 灰度化处理使用灰度化公式来进行,图片识别使用CNN模型。 开发流程 ------------- 1. 定义模型 ~~~~~~~~~~~~~~ - **图片灰度化处理过程。** 这里实现图片灰度化,使用蓝、绿、红三个通道的值进行加权求和,计算出一个灰度值。 这里使用的权重分别是0.114、0.587和0.299,这些数值是基于人眼对不同颜色的敏感度来选择的, 用于将彩色图像转换为灰度图像。公式为: .. math:: Gray = B \times 0.114 + G \times 0.587 + R \times 0.299 以及定义一个Clip操作,确保灰度值在0到255之间。 代码示例如下: .. code-block:: python :linenos: import mindspore as ms from mindspore.train.serialization import export import numpy as np from mindspore import nn, ops class Gray(nn.Cell): def __init__(self): super(Gray, self).__init__() self.split = ops.Split(axis=2, output_num=3) def construct(self, x): x = self.split(x) b, g, r = x x = ( b * 0.114 + g * 0.587 + r * 0.299 ).squeeze(-1) x = ops.clip_by_value(x, 0, 255) return x - **定义AI模型,用于图片识别。** 这里的AI模型是卷积神经网络(CNN)。CNN主要由卷积,池化,激活等算子组成; CNN模型架构如下表所示: .. list-table:: :header-rows: 1 :widths: 15 20 30 20 :align: center * - **层级** - **操作类型** - **参数细节** - **输出维度** * - **输入层** - 灰度图像输入 - 1通道,尺寸 H×W - H×W×1 * - **卷积块1** - Conv2d - 输入通道:1 → 输出通道:32, 卷积核:3×3 - H×W×32 * - - ReLU激活 - 非线性变换 - H×W×32 * - - MaxPool2d - 窗口:2×2, 步长:2 - H/2×W/2×32 * - **卷积块2** - Conv2d - 输入通道:32 → 输出通道:64, 卷积核:3×3 - H/2×W/2×64 * - - ReLU激活 - 非线性变换 - H/2×W/2×64 * - - MaxPool2d - 窗口:2×2, 步长:2 - H/4×W/4×64 * - **卷积块3** - Conv2d - 输入通道:64 → 输出通道:128, 卷积核:3×3 - H/4×W/4×128 * - - ReLU激活 - 非线性变换 - H/4×W/4×128 * - - MaxPool2d - 窗口:2×2, 步长:2 - H/8×W/8×128 * - **全连接层** - Flatten - 展平多维特征图 - 32768 (H/8×W/8×128) * - - Dense - 输入:32768 → 输出:64, 激活:ReLU - 64 * - - Dense (输出层) - 输入:64 → 输出:5, 无激活 - 5 * - **输出层** - 分类结果 - 5类概率分布 - 5 CNN模型用于图片识别,不能直接导出模型,需要训练模型,该部分内容见: :ref:`model_training` 定义CNN模型的代码示例如下: .. code-block:: python :linenos: import mindspore as ms from mindspore import nn class CNN(nn.Cell): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 32, kernel_size=3, has_bias=True) self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv2 = nn.Conv2d(32, 64, kernel_size=3, has_bias=True) self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv3 = nn.Conv2d(64, 128, kernel_size=3, has_bias=True) self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) self.flatten = nn.Flatten() self.dense1 = nn.Dense(32768, 64) self.dense2 = nn.Dense(64, 5) self.relu = nn.ReLU() def construct(self, x): x = self.relu(self.conv1(x)) x = self.pool1(x) x = self.relu(self.conv2(x)) x = self.pool2(x) x = self.relu(self.conv3(x)) x = self.pool3(x) x = self.flatten(x) x = self.relu(self.dense1(x)) x = self.dense2(x) return x .. _model_training: 2. 模型训练 ~~~~~~~~~~~~~~~ CNN模型需要进行训练才能正确识别,训练需要数据集,这里已经提前准备好数据集,放在Target文件夹下, 使用MindSpore框架进行模型训练,需要导入相关库和模块,定义数据预处理、模型结构、损失函数和优化器等。 重新组网时,直接使用 ``nn.GraphCell()`` 接口会导致权重丢失, 可以在训练前时使用 ``ms.save_checkpoint()`` 接口保存成ckpt文件, 重新组网时,使用 ``ms.load_checkpoint()`` 接口加载ckpt文件即可。 以下代码展示了如何加载数据集,进行10次模型训练,以及导出模型。 训练以及导出模型代码如下: .. code-block:: python :linenos: import mindspore as ms from mindspore.train.serialization import export from mindspore import nn, context import numpy as np from PIL import Image import mindspore.dataset as ds from mindspore.dataset import py_transforms import mindspore.dataset.vision as CV from mindspore.train.callback import LossMonitor batch_size = 32 img_size = (128, 128) data_path = './Target' # 数据预处理 def rescale_to_0_1(image): return image / 255.0 # 自定义函数,添加 color 通道维度 def add_channels(image): if len(image.shape) == 2: # 单个图像,没有 cin_channel 维度 image_four_channels = np.expand_dims(image, axis=0) else: pass return image return image_four_channels def export_cnn(): context.set_context(mode=context.PYNATIVE_MODE, device_target="CPU") resize_op = CV.Resize(img_size) rescale_transform = ms.dataset.transforms.Compose([rescale_to_0_1]) f32_typecast = ms.dataset.transforms.TypeCast(ms.float32) # 将读取的 RGB 转为 GRAY 模式 convert_gray = ms.dataset.vision.ConvertColor(ms.dataset.vision.ConvertMode.COLOR_RGB2GRAY) transform = [convert_gray, resize_op, f32_typecast, rescale_transform, CV.HWC2CHW()] train_data = ds.ImageFolderDataset(dataset_dir=data_path, decode=True, extensions=[".JPEG", ".PNG", ".JPG"]) train_data = train_data.map(input_columns="image", operations= transform) train_data = train_data.map(operations=lambda image: add_channels(image), \ input_columns=["image"], \ output_columns=["image"]) train_data = train_data.batch(batch_size=batch_size) data_iter = train_data.create_dict_iterator() print("数据集加载完成") my_model = CNN() loss = nn.CrossEntropyLoss(reduction='mean') optimizer = nn.Adam(my_model.trainable_params(), learning_rate=0.01) cnn_model = ms.Model(my_model, loss_fn=loss, optimizer=optimizer, metrics={'Accuracy': nn.Accuracy()}) print("网络构建完成") num_epoch = 10 cnn_model.train(num_epoch, train_data,callbacks=[LossMonitor()],dataset_sink_mode=False) inputs = ms.Tensor(np.random.randn(1, 1, 128, 128).astype(np.float32)) ms.save_checkpoint(my_model, 'cnn.ckpt') export(my_model, inputs, file_name='cnn', file_format="MINDIR") 3. 重新组网 ~~~~~~~~~~~~~~~ 在前面章节中,我们已经完成了AI的模型的训练和导出。 需要重新构建成一个新的网络结构,可以使用MindSpore框架的 ``nn.GraphCell()`` 接口来实现。 该部分内容包括加载训练好的cnn模型,灰度化处理,重新构建网络结构,并导出新的模型。 重新组网、导出模型以及测试的代码如下: .. code-block:: python :linenos: import cv2 import mindspore as ms from mindspore.train.serialization import export import numpy as np from mindspore import nn, ops, context from CNN import export_cnn from GRAY import export_gray from Connection import export_connection class_names = ['BRDM_2', 'BTR_60', 'SLICY', 'T62', 'ZSU_23_4'] class_labels = ["装甲侦察车", "装甲运输车", "不明", "坦克", "自行高炮"] class GrayCNN(nn.Cell): def __init__(self): super(GrayCNN, self).__init__() self.split = ops.Split(axis=2, output_num=3) self.conv1 = nn.Conv2d(1, 32, kernel_size=3, has_bias=True) self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv2 = nn.Conv2d(32, 64, kernel_size=3, has_bias=True) self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv3 = nn.Conv2d(64, 128, kernel_size=3, has_bias=True) self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) self.flatten = nn.Flatten() self.dense1 = nn.Dense(32768, 64) self.dense2 = nn.Dense(64, 5) self.relu = nn.ReLU() self.cnn = nn.GraphCell(ms.load(file_name="cnn.mindir")) def construct(self, x): x = self.split(x) b, g, r = x x = ( b * 0.114 + g * 0.587 + r * 0.299 ).squeeze(-1) x = ops.clip_by_value(x, 0, 255) x = x / 255.0 # 数据归一化 x = x.reshape(1, 1, 128, 128) # 灰度化处理后结果需要重塑成cnn输入形状 x = self.cnn(x) return x if __name__ == "__main__": export_cnn() context.set_context(mode=context.GRAPH_MODE) # 1. 读取图片(保持原始uint8类型) color_image = cv2.imread("image_origin.jpg") # 默认uint8 resized_image = cv2.resize( color_image, (128, 128), # 目标尺寸(width, height) interpolation=cv2.INTER_LINEAR ) resized_image = resized_image.astype(np.float32) resized_image_tensor = ms.Tensor(resized_image) my_model = GrayCNN() param_dict = ms.load_checkpoint("cnn.ckpt") param_not_load, _ = ms.load_param_into_net(my_model, param_dict, strict_load=True) input_tensor = ms.Tensor(np.ones((128, 128, 3), dtype=np.float32)) export(my_model, input_tensor, file_name='gray_cnn', file_format='MINDIR') reload_cnn = nn.GraphCell(ms.load(file_name='gray_cnn.mindir')) reload_cnn = ms.Model(reload_cnn) m = reload_cnn.predict(resized_image_tensor) print(m) output_np = m.asnumpy() if len(output_np.shape) == 2: # 对于分类任务,通常输出是[batch_size, num_classes] output_prob = np.exp(output_np) / np.exp(output_np).sum(axis=1, keepdims=True) predicted_class = np.argmax(output_prob, axis=1) print(f"预测结果: {class_labels[predicted_class[0]]}") 该代码首先定义了一个新的网络结构GrayCNN, 在 ``construct`` 方法中,首先通过gray模型处理输入图像,然后通过归一化和重塑张量,最后cnn模型进行识别。 在主函数中,首先导出cnn模型。 然后加载cnn模型参数,并使用MindSpore的 ``export`` 方法导出新的网络结构。 最后,使用重新组网后的模型进行预测,并输出结果。 4. 转换模型 ~~~~~~~~~~~~~~~~~ 在前面的代码中,我们已经使用了 ``export`` 方法导出了gray_cnn.mindir模型。 要将该模型部署到FT78NE平台,需要将MINDIR格式转换成mindspore lite的ms格式模型。 要将该模型转换为ms格式,可以使用MindSpore的转换工具(converter_lite)。 该工具已经集成在我们的 ``YHFT-IDE`` 中,可以直接在IDE中使用。或者也可以使用命令行工具进行转换。 转换命令如下: .. code-block:: bash ./converter_lite --fmk=MINDIR --modelFile=gray_cnn.mindir --outputFile=gray_cnn 转换完成后,会在当前目录下生成gray_cnn.ms模型文件。该模型可以使用可视化工具(netron)可以打开该文件,查看模型结构。 该工具可以直接在 ``YHFT-IDE`` 中使用,可以从官网下载使用,也可以在线使用。在线地址为:https://netron.app/。 该模型文件可视化如图所示: .. image:: ../../_static/gray_cnn.png :width: 80% :align: center :alt: gray_cnn模型结构图 :scale: 50% 5. 部署和运行程序 ~~~~~~~~~~~~~~~~~~ - 在IDE中新建一个Python项目,将下面的 :ref:`python_code` 代码拷贝到项目中,并运行。 运行成功后,会在当前目录下生成一个名为 ``gray_cnn.midir`` 的模型文件,以及输出图片的预测结果。 结果如图所示: .. image:: ../../_static/python_gray_cnn_res.png :width: 80% :align: center :alt: python运行结果 :scale: 50% 将该文件转换为ms格式,并将ms格式模型和测试图片拷贝到FT78NE平台中。 - 打开 ``YHFT-IDE`` ,新建工程。输入工程名、路径,工程类型选择 ``Heterogeneous`` , 输入交叉编译工具路径,然后点确定。会生成一个异构模板工程。通过修改 ``data_handler.cc`` 文件中的函数来调整输入输出数据, 输入改成读取的图片路径;输出改成相应的后处理。在 ``main`` 函数设置运行后端;修改 ``CMakeLists.txt`` 文件, 添加openCV库的lib和include路径, 最后编译该工程,编译成功后将build文件夹下的 ``main`` 拷贝到FT78NE平台中, 要和gray_cnn.ms模型同一个文件夹下。 输入的c++代码示例如下: .. code-block:: c++ :linenos: #include int readImage(float *reslut, std::string imagePath) { cv::Mat color_image = cv::imread(imagePath.c_str(), cv::IMREAD_COLOR); if (color_image.empty()) { fprintf(stderr, "read image failed\n"); return -1; } int width = 128; int height = 128; cv::Mat resized_image; cv::resize(color_image, resized_image, cv::Size(width, height), 0, 0, cv::INTER_LINEAR); resized_image.convertTo(resized_image, CV_32F); const int channels = resized_image.channels(); const int element_count = width * height * channels; for (int i = 0; i < height; ++i) { const float *src_ptr = resized_image.ptr(i); float *dst_ptr = reslut + i * width * channels; std::memcpy(dst_ptr, src_ptr, width * channels * sizeof(float)); } return 0; } 设置后端代码如下: .. code-block:: c++ :linenos: auto context = std::make_shared(); auto &device_list = context->MutableDeviceInfo(); context->SetBuiltInDelegate(mindspore::DelegateMode::kPNNA); auto cpu_info = std::make_shared(); auto dsp_info = std::make_shared(); device_list.push_back(dsp_info); device_list.push_back(cpu_info); 输出结果后处理代码如下: .. code-block:: c++ :linenos: std::vector floatArrayToVector(const float *array, size_t size) { std::vector vec(array, array + size); return vec; } std::vector numpy_exp(const std::vector &x) { std::vector result; result.reserve(x.size()); for (float num : x) { result.push_back(exp(num)); } return result; } float sum_rows(const std::vector &vec, int axis) { float sum = std::accumulate(vec.begin(), vec.end(), 0.0); return sum; } std::vector> convertTo2D(const std::vector &vec, int rows) { std::vector> result; if (vec.empty() || rows <= 0) { return result; } int cols = (vec.size() + rows - 1) / rows; // 计算列数 for (int i = 0; i < rows; ++i) { std::vector row; for (int j = 0; j < cols; ++j) { size_t index = i * cols + j; // 计算索引 if (index < vec.size()) { row.push_back(vec[index]); } else { row.push_back(0); // 填充剩余空间,或者你可以选择不填充 } } result.push_back(row); } return result; } void GetOutputData(std::vector &outputs) { for (auto tensor : outputs) { std::cout << "tensor name is:" << tensor.Name() << " tensor size is:" << tensor.DataSize() << " tensor elements num is:" << tensor.ElementNum() << std::endl; auto out_data = reinterpret_cast(tensor.Data().get()); std::cout << "output data is:"; for (int i = 0; i < tensor.ElementNum() && i <= 50; i++) { std::cout << out_data[i] << " "; } std::cout << std::endl; std::vector out = floatArrayToVector(out_data, tensor.ElementNum()); std::vector exp_x = numpy_exp(out); float sums = sum_rows(exp_x, 1); std::vector x1; for (float num : exp_x) { x1.push_back(num / sums); } std::vector> twoD = convertTo2D(x1, 1); std::vector argmax_indices = argmax(twoD); std::vector Predicted_class = {"装甲侦察车", "装甲运输车", "不明", "坦克", "自行高炮"}; std::cout << "预测结果: " << Predicted_class[argmax_indices[0]] << std::endl; } } - 在FT78NE上运行可执行文件 ``main`` ,观察输出结果。与预期结果对比,验证模型推理的正确性。 执行命令如下: .. code-block:: bash :linenos: ./main gray_cnn.ms image_origin.jpg 执行结果如下图: .. image:: ../../_static/ft78ne_gray_cnn_output.png :width: 80% :align: center :alt: FT78NE运行结果 :scale: 50% .. _python_code: python完整代码示例 ------------------- .. code-block:: python :linenos: # CNN.py import mindspore as ms from mindspore.train.serialization import export from mindspore import nn, context import numpy as np from PIL import Image import mindspore.dataset as ds from mindspore.dataset import py_transforms import mindspore.dataset.vision as CV from mindspore.train.callback import LossMonitor # # Define directories and parameters batch_size = 32 img_size = (128, 128) data_path = '../cnn-sar/Target' # 数据预处理 def rescale_to_0_1(image): return image / 255.0 # 自定义函数,添加 color 通道维度 def add_channels(image): if len(image.shape) == 2: # 单个图像,没有 cin_channel 维度 # print("before ===> ", image.shape) # 添加 color 通道维度,(128x128) => (1, 128, 128) image_four_channels = np.expand_dims(image, axis=0) # print("after ===> ", image_four_channels.shape) else: pass return image return image_four_channels class CNN(ms.nn.Cell): def __init__(self): super().__init__() self.conv1 = nn.Conv2d(1, 32, kernel_size=3, has_bias=True) self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv2 = nn.Conv2d(32, 64, kernel_size=3, has_bias=True) self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv3 = nn.Conv2d(64, 128, kernel_size=3, has_bias=True) self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) self.flatten = nn.Flatten() self.dense1 = nn.Dense(32768, 64) self.dense2 = nn.Dense(64, 5) self.relu = nn.ReLU() def construct(self, x): x = self.relu(self.conv1(x)) x = self.pool1(x) x = self.relu(self.conv2(x)) x = self.pool2(x) x = self.relu(self.conv3(x)) x = self.pool3(x) x = self.flatten(x) x = self.relu(self.dense1(x)) x = self.dense2(x) return x def export_cnn(): context.set_context(mode=context.PYNATIVE_MODE, device_target="CPU") resize_op = CV.Resize(img_size) rescale_transform = ms.dataset.transforms.Compose([rescale_to_0_1]) f32_typecast = ms.dataset.transforms.TypeCast(ms.float32) # 将读取的 RGB 转为 GRAY 模式 convert_gray = ms.dataset.vision.ConvertColor(ms.dataset.vision.ConvertMode.COLOR_RGB2GRAY) transform = [convert_gray, resize_op, f32_typecast, rescale_transform, CV.HWC2CHW()] train_data = ds.ImageFolderDataset(dataset_dir=data_path, decode=True, extensions=[".JPEG", ".PNG", ".JPG"]) train_data = train_data.map(input_columns="image", operations= transform) train_data = train_data.map(operations=lambda image: add_channels(image), \ input_columns=["image"], \ output_columns=["image"]) train_data = train_data.batch(batch_size=batch_size) data_iter = train_data.create_dict_iterator() print("数据集加载完成") my_model = CNN() loss = nn.CrossEntropyLoss(reduction='mean') optimizer = nn.Adam(my_model.trainable_params(), learning_rate=0.01) cnn_model = ms.Model(my_model, loss_fn=loss, optimizer=optimizer, metrics={'Accuracy': nn.Accuracy()}) print("网络构建完成") num_epoch = 10 cnn_model.train(num_epoch, train_data,callbacks=[LossMonitor()],dataset_sink_mode=False) inputs = ms.Tensor(np.random.randn(1, 1, 128, 128).astype(np.float32)) ms.save_checkpoint(my_model, 'cnn.ckpt') export(my_model, inputs, file_name='cnn', file_format="MINDIR") .. code-block:: python :linenos: # GRAY_CNN.py import cv2 import mindspore as ms from mindspore.train.serialization import export import numpy as np from mindspore import nn, ops, context from CNN import export_cnn class_names = ['BRDM_2', 'BTR_60', 'SLICY', 'T62', 'ZSU_23_4'] class_labels = ["装甲侦察车", "装甲运输车", "不明", "坦克", "自行高炮"] class GrayCNN(nn.Cell): def __init__(self): super(GrayCNN, self).__init__() self.split = ops.Split(axis=2, output_num=3) self.conv1 = nn.Conv2d(1, 32, kernel_size=3, has_bias=True) self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv2 = nn.Conv2d(32, 64, kernel_size=3, has_bias=True) self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2) self.conv3 = nn.Conv2d(64, 128, kernel_size=3, has_bias=True) self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2) self.flatten = nn.Flatten() self.dense1 = nn.Dense(32768, 64) self.dense2 = nn.Dense(64, 5) self.relu = nn.ReLU() self.cnn = nn.GraphCell(ms.load(file_name="cnn.mindir")) def construct(self, x): x = self.split(x) b, g, r = x x = ( b * 0.114 + g * 0.587 + r * 0.299 ).squeeze(-1) x = ops.clip_by_value(x, 0, 255) x = x / 255.0 # 数据归一化 x = x.reshape(1, 1, 128, 128) # 灰度化处理后结果需要重塑成cnn输入形状 x = self.cnn(x) return x if __name__ == "__main__": export_cnn() context.set_context(mode=context.GRAPH_MODE) # 1. 读取图片(保持原始uint8类型) color_image = cv2.imread("image_origin.jpg") # 默认uint8 resized_image = cv2.resize( color_image, (128, 128), # 目标尺寸(width, height) interpolation=cv2.INTER_LINEAR ) resized_image = resized_image.astype(np.float32) resized_image_tensor = ms.Tensor(resized_image) my_model = GrayCNN() param_dict = ms.load_checkpoint("cnn.ckpt") param_not_load, _ = ms.load_param_into_net(my_model, param_dict, strict_load=True) input_tensor = ms.Tensor(np.ones((128, 128, 3), dtype=np.float32)) export(my_model, input_tensor, file_name='gray_cnn', file_format='MINDIR') reload_cnn = nn.GraphCell(ms.load(file_name='gray_cnn.mindir')) reload_cnn = ms.Model(reload_cnn) m = reload_cnn.predict(resized_image_tensor) print(m) output_np = m.asnumpy() if len(output_np.shape) == 2: # 对于分类任务,通常输出是[batch_size, num_classes] output_prob = np.exp(output_np) / np.exp(output_np).sum(axis=1, keepdims=True) predicted_class = np.argmax(output_prob, axis=1) print(f"预测结果: {class_labels[predicted_class[0]]}")